3c03332ad039d2cbef9082ac6604c2fd55a4f52b,embulk-output-jdbc/src/main/java/org/embulk/output/jdbc/AbstractJdbcOutputPlugin.java,AbstractJdbcOutputPlugin,doBegin,#JdbcOutputConnection#PluginTask#Schema#number#,424
Before Change
// direct modify mode doesn't need intermediate tables.
ImmutableList.Builder<String> intermTableNames = ImmutableList.builder();
if (mode.tempTablePerTask()) {
String namePrefix = generateIntermediateTableNamePrefix(task.getTable(), con, 3,
task.getFeatures().getMaxTableNameLength(), task.getFeatures().getTableNameLengthSemantics());
for (int i=0; i < taskCount; i++) {
intermTableNames.add(namePrefix + String.format("%03d", i));
After Change
logger.info("Using {} mode", mode);
if (con.tableExists(task.getTable())) {
task.setActualTable(task.getTable());
} else {
String upperTable = task.getTable().toUpperCase();
String lowerTable = task.getTable().toLowerCase();
if (con.tableExists(upperTable)) {
if (con.tableExists(lowerTable)) {
throw new ConfigException(String.format("Cannot specify table '%s' because both '%s' and '%s' exist.",
task.getTable(), upperTable, lowerTable));
} else {
task.setActualTable(upperTable);
}
} else {
if (con.tableExists(lowerTable)) {
task.setActualTable(lowerTable);
} else {
task.setActualTable(task.getTable());
}
}
}
Optional<JdbcSchema> initialTargetTableSchema =
mode.ignoreTargetTableSchema() ?
Optional.<JdbcSchema>absent() :
newJdbcSchemaFromTableIfExists(con, task.getActualTable());
// TODO get CREATE TABLE statement from task if set
JdbcSchema newTableSchema = applyColumnOptionsToNewTableSchema(
initialTargetTableSchema.or(new Supplier<JdbcSchema>() {
public JdbcSchema get()
{
return newJdbcSchemaForNewTable(schema);
}
}),
task.getColumnOptions());
// create intermediate tables
if (!mode.isDirectModify()) {
// direct modify mode doesn't need intermediate tables.
ImmutableList.Builder<String> intermTableNames = ImmutableList.builder();
if (mode.tempTablePerTask()) {
String namePrefix = generateIntermediateTableNamePrefix(task.getActualTable(), con, 3,
task.getFeatures().getMaxTableNameLength(), task.getFeatures().getTableNameLengthSemantics());
for (int i=0; i < taskCount; i++) {
intermTableNames.add(namePrefix + String.format("%03d", i));
}
} else {
String name = generateIntermediateTableNamePrefix(task.getActualTable(), con, 0,
task.getFeatures().getMaxTableNameLength(), task.getFeatures().getTableNameLengthSemantics());
intermTableNames.add(name);
}
// create the intermediate tables here
task.setIntermediateTables(Optional.<List<String>>of(intermTableNames.build()));
for (String name : task.getIntermediateTables().get()) {
// DROP TABLE IF EXISTS xyz__0000000054d92dee1e452158_bulk_load_temp
con.dropTableIfExists(name);
// CREATE TABLE IF NOT EXISTS xyz__0000000054d92dee1e452158_bulk_load_temp
con.createTableIfNotExists(name, newTableSchema);
}
} else {
task.setIntermediateTables(Optional.<List<String>>absent());
}
// build JdbcSchema from a table
JdbcSchema targetTableSchema;
if (initialTargetTableSchema.isPresent()) {
targetTableSchema = initialTargetTableSchema.get();
task.setNewTableSchema(Optional.<JdbcSchema>absent());
} else if (task.getIntermediateTables().isPresent() && !task.getIntermediateTables().get().isEmpty()) {
String firstItermTable = task.getIntermediateTables().get().get(0);
targetTableSchema = newJdbcSchemaFromTableIfExists(con, firstItermTable).get();
task.setNewTableSchema(Optional.of(newTableSchema));
} else {
// also create the target table if not exists
// CREATE TABLE IF NOT EXISTS xyz
con.createTableIfNotExists(task.getActualTable(), newTableSchema);
targetTableSchema = newJdbcSchemaFromTableIfExists(con, task.getActualTable()).get();
task.setNewTableSchema(Optional.<JdbcSchema>absent());
}
task.setTargetTableSchema(matchSchemaByColumnNames(schema, targetTableSchema));